package com.mimo.logic.message.service.impl;

import java.text.MessageFormat;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.annotation.Resource;

import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;

import com.mimo.common.cache.ICache;
import com.mimo.common.logic.code.StatusCode;
import com.mimo.common.logic.dto.msg.BaseDispatchMessage;
import com.mimo.common.logic.dto.msg.DispatchMsgType;
import com.mimo.common.logic.dto.msg.P2PMessage;
import com.mimo.common.logic.dto.msg.Peer2RoomMessage;
import com.mimo.common.logic.dto.msg.Room2PeerMessage;
import com.mimo.common.utils.JsonUtils;
import com.mimo.common.utils.RandomStringUtils;
import com.mimo.logic.binder.LogicMessageDispatchProcessor;
import com.mimo.logic.binder.LogicMessageReceptionProcessor;
import com.mimo.logic.blocking.constants.DenyOperation;
import com.mimo.logic.blocking.service.IAccessiableService;
import com.mimo.logic.message.constants.MessageKeys;
import com.mimo.logic.message.service.IMessageDispatcher;
import com.mimo.logic.message.service.IMessageUpstreamConstraintsService;
import com.mimo.logic.metric.constants.MetricType;
import com.mimo.logic.metric.service.IMetricService;
import com.mimo.logic.room.service.IRoomService;
import com.mimo.logic.user.service.IUserService;

@Service
public class MessageDispatcherImpl implements IMessageDispatcher {
  private static final int MAX_UNACKED_SIZE = 64;

  @Resource(name = "roomCache")
  private ICache<String, String[], String> roomCache;

  @Resource(name = "onDispatchExecutor")
  private Executor onDispatchExecutor;

  @Value("${logic.admin.name:SysAdmin}")
  private String adminName;

  @Autowired
  private RedisTemplate<String, String> redisTemplate;

  @Autowired
  private IUserService userService;

  @Autowired
  private LogicMessageReceptionProcessor receptionProcessor;

  @Autowired
  private IMetricService metricService;

  @Autowired
  private IRoomService roomService;

  @Autowired
  private LogicMessageDispatchProcessor dispatchProcessor;

  @Autowired
  private IAccessiableService userAccessiableService;

  @Autowired
  private IMessageUpstreamConstraintsService messageUpstreamConstraintsService;

  @Override
  public StatusCode dispatch(BaseDispatchMessage msg) {
    // 上行消息频次约束检查
    StatusCode statusCode = checkOnConstaints(msg);

    if (statusCode.isSuccess()) {
      // 操作权限检查
      statusCode = checkOnAccess(msg);

      // 通过层层检测后,消息接收柜台完成收件
      if (statusCode.isSuccess()) {
        receptionProcessor.output().send(MessageBuilder.withPayload(msg).build());
        // 上行消息（个人消息，房间消息）统计
        if (msg.getType() == DispatchMsgType.P2P) {
          metricService.accumulate(MetricType.Msg_P2P_Upstream, msg.getFrom());
        } else if (msg.getType() == DispatchMsgType.Peer2Room) {
          metricService.accumulate(MetricType.Msg_Room_Upstream, msg.getTarget());
        }
      } // end of 柜台收件
    }

    return statusCode;
  }

  /**
   * 柜台收件后，需要进行消息拆分,之后再统一转储到消息Broker
   */
  @StreamListener(LogicMessageReceptionProcessor.INPUT)
  @Override
  public void onDispatch(BaseDispatchMessage msg) {
    switch (msg.getType()) {
      case Peer2Room:
        CompletableFuture.runAsync(() -> this.process((Peer2RoomMessage) msg), onDispatchExecutor);
        break;
      case P2P:
        this.process((P2PMessage) msg);
        break;
      default:
        throw new UnsupportedOperationException();
    }
  }

  /**
   * 点对点消息转储
   *
   * @param evt
   */
  private void process(P2PMessage evt) {
    // ---------------------- 持久化redis ----------------------
    int slot = Math.abs(evt.getId().hashCode() % MAX_SLOT);
    String p2pBucketKey = MessageFormat.format(MessageKeys.LOGIC_MSG_BUCKET_P2P_PATTERN, slot);
    String p2pBucketDetailKey = MessageFormat.format(MessageKeys.LOGIC_MSG_BUCKET_P2P_DETAIL_PATTERN, slot);
    String p2pBelongsKey = MessageFormat.format(MessageKeys.LOGIC_MSG_P2P_BELONGS_PATTERN, slot);
    String p2pUserKey = MessageFormat.format(MessageKeys.LOGIC_MSG_USER_P2P_PATTERN, evt.getTarget());

    byte[] evtId = evt.getId().getBytes();
    byte[] evtData = JsonUtils.toJsonBytes(evt);
    redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
      connection.zAdd(p2pBucketKey.getBytes(), evt.getTimestamp(), evtId);
      connection.hSet(p2pBucketDetailKey.getBytes(), evtId, evtData);
      connection.hSet(p2pBelongsKey.getBytes(), evtId, evt.getTarget().getBytes());
      connection.hSet(p2pUserKey.getBytes(), evtId, evtData);
      return null;
    });
    // ----------------------持久化完成-----------------------------

    // 进行消息转储
    dispatchProcessor.output().send(MessageBuilder.withPayload(evt).build());

    // 目标用户的下行消息（个人消息）维度统计
    metricService.accumulate(MetricType.Msg_P2P_Downstream, evt.getTarget());
  }

  /**
   * 房间消息分发转储,目前对于房间分发，使用的是写扩散的方式
   *
   * @param evt
   */
  private void process(Peer2RoomMessage evt) {
    Room2PeerMessage msg = new Room2PeerMessage();
    BeanUtils.copyProperties(evt, msg, "type", "target");
    msg.setRoomId(evt.getTarget()); // 先填充房间ID
    msg.setOriginalId(evt.getId()); // 填充源消息ID
    String[] members = roomCache.get(evt.getTarget());
    if (members.length > 0) {
      Stream.of(members).forEach(member -> {
        // 此处主要是通过反复的更新msg本身，以减少中间对象的创建，实现对象重用
        msg.setId(RandomStringUtils.uniqueRandom());
        msg.setTarget(member);
        // Broker转储
        dispatchProcessor.output().send(MessageBuilder.withPayload(msg).build());
      });// end of drop in message to MQ

      // 房间下行消息累计
      metricService.accumulate(MetricType.Msg_Room_Downstream, evt.getTarget(), members.length);
    } // end of member length checking
  }

  @Override
  public Collection<BaseDispatchMessage> loadUnacksByUserId(String userId) {
    String p2pUserKey = MessageFormat.format(MessageKeys.LOGIC_MSG_USER_P2P_PATTERN, userId);
    return redisTemplate.<String, String> opsForHash().values(p2pUserKey).stream().limit(MAX_UNACKED_SIZE)
        .map(json -> JsonUtils.parseJson(json, BaseDispatchMessage.class)).collect(Collectors.toList());
  }

  @Override
  public void ack(String userId, String msgId) {
    String userP2PKey = MessageFormat.format(MessageKeys.LOGIC_MSG_USER_P2P_PATTERN, userId);
    if (redisTemplate.<String, String> opsForHash().hasKey(userP2PKey, msgId).booleanValue()) {
      this.remove(msgId);
    }
  }

  @Override
  public void remove(String msgId) { // 目前只支持P2P消息
    int slot = Math.abs(msgId.hashCode() % MAX_SLOT);
    String p2pBucketKey = MessageFormat.format(MessageKeys.LOGIC_MSG_BUCKET_P2P_PATTERN, slot);
    String p2pBucketDetailKey = MessageFormat.format(MessageKeys.LOGIC_MSG_BUCKET_P2P_DETAIL_PATTERN, slot);
    String p2pBelongsKey = MessageFormat.format(MessageKeys.LOGIC_MSG_P2P_BELONGS_PATTERN, slot);

    String user = redisTemplate.<String, String> opsForHash().get(p2pBelongsKey, msgId);

    redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
      connection.zRem(p2pBucketKey.getBytes(), msgId.getBytes());
      connection.hDel(p2pBucketDetailKey.getBytes(), msgId.getBytes());
      if (Objects.nonNull(user)) {
        connection.hDel(p2pBelongsKey.getBytes(), msgId.getBytes());
        connection.hDel(MessageFormat.format(MessageKeys.LOGIC_MSG_USER_P2P_PATTERN, user).getBytes(),
            msgId.getBytes());
      }

      return null;
    });
  }

  @Override
  public Collection<String> getP2PMsgByBucketSlotAndExpireBetween(int slot, long start, long end, long count) {
    Assert.isTrue(end >= start, "End 必须 >= start");
    Assert.isTrue(count > 0, "count 必须大于0");
    Assert.isTrue(slot >= 0 && slot < MAX_SLOT, "slot取值范围[0,100)");
    String msgBucket = MessageFormat.format(MessageKeys.LOGIC_MSG_BUCKET_P2P_PATTERN, slot);
    return redisTemplate.opsForZSet().rangeByScore(msgBucket, start, end, 0, count);
  }

  /**
   * 收到投递消息时频次约束检查,针对P2P和P2Room
   *
   * @param msg
   * @return
   */
  private StatusCode checkOnConstaints(BaseDispatchMessage msg) {
    StatusCode code = StatusCode.Success;

    if (!Objects.equals(adminName, msg.getFrom())
        && !messageUpstreamConstraintsService.checkPeerUpstreamConstraint(msg)) { // 除了管理员，所有上行的消息，都需要做频次约束
      code = StatusCode.PersonMessageCap;
    }

    if (code.isSuccess() && msg instanceof Peer2RoomMessage
        && !messageUpstreamConstraintsService.checkP2RoomConstraint((Peer2RoomMessage) msg)) {// 对于直播间内的上行消息做数量限制
      code = StatusCode.RoomMessageCap;
    }
    return code;
  }

  /**
   * 收到投递消息时的权限检查以及频次约束检查
   *
   * @param msg
   * @return
   */
  private StatusCode checkOnAccess(BaseDispatchMessage msg) {
    StatusCode statusCode = StatusCode.Success;

    boolean isAdmin = Objects.equals(msg.getFrom(), adminName);

    if (!isAdmin) { // 对于非系统管理员角色,必须做权限检查
      statusCode = userAccessiableService.checkAccessiable(msg.getFrom(), null, DenyOperation.GlobalBlock);
    }

    if (statusCode.isSuccess()) {// 如果用户没有被全局封禁的话
      if (!isAdmin) {
        // 全局禁言状态检查
        statusCode = userAccessiableService.checkAccessiable(msg.getFrom(), null, DenyOperation.GlobalMute);
      }

      if (statusCode.isSuccess()) {
        if (msg instanceof P2PMessage) {// 私信消息权限检测
          if (!isAdmin) {
            statusCode = userAccessiableService.checkAccessiable(msg.getFrom(), msg.getTarget(), DenyOperation.P2P);
          }
          if (statusCode.isSuccess() && !userService.isValid(msg.getTarget())) { // 目标用户合法性检验
            statusCode = StatusCode.InvalidPeer;
          }
        } else if (msg instanceof Peer2RoomMessage) {// 直播间内发消息，检测发言权限
          if (!isAdmin) {
            statusCode = userAccessiableService.checkAccessiable(msg.getFrom(), msg.getTarget(),
                DenyOperation.RoomMute);
          }
          if (statusCode.isSuccess()) {
            Peer2RoomMessage p2r = Peer2RoomMessage.class.cast(msg);
            if (!roomService.exists(p2r.getTarget())) {// 检测房间存在的有效性
              statusCode = StatusCode.RoomNotExist;
            } else if (!isAdmin && !roomService.isMemberOf(msg.getTarget(), msg.getFrom())) { // 检测成员和房间的归属关系
              statusCode = StatusCode.RoomUncheckIn;
            }
          }
        }
      }

    }

    return statusCode;
  }
}
